-
Notifications
You must be signed in to change notification settings - Fork 1
read_csv and read_parquet for review #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
python/ray/dataframe/io.py
Outdated
| from .dataframe import from_pandas, ray, DataFrame | ||
|
|
||
|
|
||
| def read_parquet(path, columns=None, npartitions=None, chunksize=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to stay with the Pandas sinature: pandas.read_parquet(path, engine='auto', columns=None, **kwargs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Where should npartitions and chunksiz goes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some thought
- the best way to do it is to set default. Since that will keep the API the same and the users don't need to set worry about what arguments are needed to ray. The chunksize and npartitions are still there in keyword arguments for them to fine-tune.
- Example:
# inside ray/dataframe/dataframe.py
DEFAULT_NPARTITIONS = 2
DEFAULT_CHUNKSIZE = 20
def set_npartition_default(n):
global DEFAULT_NPARTITIONS
DEFAULT_NPARTITIONS = n
def set_chunksize_default(s):
global s
DEFAULT_CHUNKSIZE = s
def from_pandas(df, npartitions=None, chunksize=None):
if not npartitions and not chunksize:
npartitions = DEFAULT_PARTITIONS
....- This scheme is future-proof. If we are going to devise some scheme to set the default chunksize automatically for users, we can do it inside the set_npartition_default dynamically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes a lot of sense. I think that we have typically been creating 1x the number of virtual CPUs, but it has been manual. Ray knows how many CPUs it can access, so we just need to have that. We can tune this a bit ourselves for the average case and leave it to users to tune the rest.
python/ray/dataframe/io.py
Outdated
| return from_pandas(pd.read_csv(BytesIO(to_read)), npartitions=1) | ||
|
|
||
|
|
||
| def read_csv(path, npartitions, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to keep Pandas signature: pandas.read_csv(filepath_or_buffer, sep=', ', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False, prefix=None, mangle_dupe_cols=True, dtype=None, engine=None, converters=None, true_values=None, false_values=None, skipinitialspace=False, skiprows=None, nrows=None, na_values=None, keep_default_na=True, na_filter=True, verbose=False, skip_blank_lines=True, parse_dates=False, infer_datetime_format=False, keep_date_col=False, date_parser=None, dayfirst=False, iterator=False, chunksize=None, compression='infer', thousands=None, decimal=b'.', lineterminator=None, quotechar='"', quoting=0, escapechar=None, comment=None, encoding=None, dialect=None, tupleize_cols=None, error_bad_lines=True, warn_bad_lines=True, skipfooter=0, skip_footer=0, doublequote=True, delim_whitespace=False, as_recarray=None, compact_ints=None, use_unsigned=None, low_memory=True, buffer_lines=None, memory_map=False, float_precision=None).
If there are things that don't get implemented yet, you can just have it return a NotImplementedError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😱. These can be passed into pd.read_csv(...) so there won't be any NotImplementedError.
But same question again about npartitions and chunksize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See update in read_parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about all the issues regarding the number of parameters, but our eventual goal is import ray.dataframe as pd and just let that be the only change necessary.
python/ray/dataframe/test/test_io.py
Outdated
|
|
||
|
|
||
| @pytest.fixture | ||
| def ray_df_equals_pandas_wo_index(ray_df, pandas_df): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need an __eq__() for index to check that they are totally the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with index is that pandas will have a continuous range index but all ray df starts index at 0. I'll change the test back to ray_df_equals_pandas once the new index scheme is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. Just saw it merged. Working on refining this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a Index.equals(other) and a __eq__(other) that does different things based on the type of other.
|
Strange, it looks like github ate my comments from yesterday so I just resubmitted them. |
* Precede ray.get with ray.wait. * Trigger checkpoint deletes locally in Trainable * Clean-up code. * Minor changes. * Track best checkpoint so far again * Pulled checkpoint GC out of Trainable. * Added comments, error logging. * Immediate pull after checkpoint taken; rsync source delete on pull * Minor doc fixes * Fix checkpoint manager bug * Fix bugs, tests, formatting * Fix bugs, feature flag for force sync. * Fix test. * Fix minor bugs: clear proc and less verbose sync_on_checkpoint warnings. * Fix bug: update IP of last_result. * Fixed message. * Added a lot of logging. * Changes to ray trial executor. * More bug fixes (logging after failure), better logging. * Fix richards bug and logging * Add comments. * try-except * Fix heapq bug. * . * Move handling of no available trials to ray_trial_executor (#1) * Fix formatting bug, lint. * Addressed Richard's comments * Revert tests. * fix rebase * Fix trial location reporting. * Fix test * Fix lint * Rebase, use ray.get w/ timeout, lint. * lint * fix rebase * Address richard's comments
…unicator caching (ray-project#12935) * other collectives all work * auto-linting * mannual linting #1 * mannual linting 2 * bugfix * add send/recv point-to-point calls * add some initial code for communicator caching * auto linting * optimize imports * minor fix * fix unpassed tests * support more dtypes * rerun some distributed tests for send/recv * linting
We encountered SIGSEGV when running Python test `python/ray/tests/test_failure_2.py::test_list_named_actors_timeout`. The stack is: ``` #0 0x00007fffed30f393 in std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&) () from /lib64/libstdc++.so.6 #1 0x00007fffee707649 in ray::RayLog::GetLoggerName() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #2 0x00007fffee70aa90 in ray::SpdLogMessage::Flush() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #3 0x00007fffee70af28 in ray::RayLog::~RayLog() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #4 0x00007fffee2b570d in ray::asio::testing::(anonymous namespace)::DelayManager::Init() [clone .constprop.0] () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #5 0x00007fffedd0d95a in _GLOBAL__sub_I_asio_chaos.cc () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #6 0x00007ffff7fe282a in call_init.part () from /lib64/ld-linux-x86-64.so.2 #7 0x00007ffff7fe2931 in _dl_init () from /lib64/ld-linux-x86-64.so.2 #8 0x00007ffff7fe674c in dl_open_worker () from /lib64/ld-linux-x86-64.so.2 #9 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #10 0x00007ffff7fe5ffe in _dl_open () from /lib64/ld-linux-x86-64.so.2 #11 0x00007ffff7d5f39c in dlopen_doit () from /lib64/libdl.so.2 #12 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #13 0x00007ffff7b82f13 in _dl_catch_error () from /lib64/libc.so.6 #14 0x00007ffff7d5fb09 in _dlerror_run () from /lib64/libdl.so.2 #15 0x00007ffff7d5f42a in dlopen@@GLIBC_2.2.5 () from /lib64/libdl.so.2 #16 0x00007fffef04d330 in py_dl_open (self=<optimized out>, args=<optimized out>) at /tmp/python-build.20220507135524.257789/Python-3.7.11/Modules/_ctypes/callproc.c:1369 ``` The root cause is that when loading `_raylet.so`, `static DelayManager _delay_manager` is initialized and `RAY_LOG(ERROR) << "RAY_testing_asio_delay_us is set to " << delay_env;` is executed. However, the static variables declared in `logging.cc` are not initialized yet (in this case, `std::string RayLog::logger_name_ = "ray_log_sink"`). It's better not to rely on the initialization order of static variables in different compilation units because it's not guaranteed. I propose to change all `RAY_LOG`s to `std::cerr` in `DelayManager::Init()`. The crash happens in Ant's internal codebase. Not sure why this test case passes in the community version though. BTW, I've tried different approaches: 1. Using a static local variable in `get_delay_us` and remove the global variable. This doesn't work because `init()` needs to access the variable as well. 2. Defining the global variable as type `std::unique_ptr<DelayManager>` and initialize it in `get_delay_us`. This works but it requires a lock to be thread-safe.
No description provided.